package com.ddone;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * @author ddone
 * @date 2024/5/13-19:16
 */
public class ConsumerOffSetReset {
    static final String TOPIC_NAME = "hello-kafka";
    public static void main(String[] args)
    {
        // 1. 创建Kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给Kafka配置对象添加配置信息：bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        properties.setProperty("group.id", "senior77");
        //默认值为true，修改为false关闭自动提交，如没有手动提交会重复消费消息
        properties.setProperty("enable.auto.commit", "true");
        // 自动提交时间间隔
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 设置 auto.offset.reset   none、latest、earliest
        //none: 使用新的消费者组名时，找不到自己的offset 会抛出异常NoOffsetForPartitionException
        //latest: 将offset设置为最新的
        //earliest：将offset设置为最早的
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(List.of(TOPIC_NAME));
        while (true) {
            System.out.println("....进行中");
            // 设置1秒消费一批数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> record : records)
            {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}
